{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exercise 4 - Introducing Actors\n",
    "\n",
    "**Goal:** The goal of this exercise is to show how to create an actor and how to call actor methods.\n",
    "\n",
    "See the documentation on actors at http://ray.readthedocs.io/en/latest/actors.html.\n",
    "\n",
    "Sometimes you need a \"worker\" process to have \"state\". For example, that state might be a neural network, a simulator environment, a counter, or something else entirely. However, remote functions are side-effect free. That is, they operate on inputs and produce outputs, but they don't change the state of the worker they execute on.\n",
    "\n",
    "Actors are different. When we instantiate an actor, a brand new worker is created, and all methods that are called on that actor are executed on the newly created worker.\n",
    "\n",
    "This means that with a single actor, no parallelism can be achieved because calls to the actor's methods will be executed one at a time. However, multiple actors can be created and methods can be executed on them in parallel.\n",
    "\n",
    "### Concepts for this Exercise - Actors\n",
    "\n",
    "To create an actor, decorate Python class with the `@ray.remote` decorator.\n",
    "\n",
    "```python\n",
    "@ray.remote\n",
    "class Example(object):\n",
    "    def __init__(self, x):\n",
    "        self.x = x\n",
    "    \n",
    "    def set(self, x):\n",
    "        self.x = x\n",
    "    \n",
    "    def get(self):\n",
    "        return self.x\n",
    "```\n",
    "\n",
    "Like regular Python classes, **actors encapsulate state that is shared across actor method invocations**.\n",
    "\n",
    "Actor classes differ from regular Python classes in the following ways.\n",
    "1. **Instantiation:** A regular class would be instantiated via `e = Example(1)`. Actors are instantiated via\n",
    "    ```python\n",
    "    e = Example.remote(1)\n",
    "    ```\n",
    "    When an actor is instantiated, a **new worker process** is created by a local scheduler somewhere in the cluster.\n",
    "2. **Method Invocation:** Methods of a regular class would be invoked via `e.set(2)` or `e.get()`. Actor methods are invoked differently.\n",
    "    ```python\n",
    "    >>> e.set.remote(2)\n",
    "    ObjectID(d966aa9b6486331dc2257522734a69ff603e5a1c)\n",
    "    \n",
    "    >>> e.get.remote()\n",
    "    ObjectID(7c432c085864ed4c7c18cf112377a608676afbc3)\n",
    "    ```\n",
    "3. **Return Values:** Actor methods are non-blocking. They immediately return an object ID and **they create a task which is scheduled on the actor worker**. The result can be retrieved with `ray.get`.\n",
    "    ```python\n",
    "    >>> ray.get(e.set.remote(2))\n",
    "    None\n",
    "    \n",
    "    >>> ray.get(e.get.remote())\n",
    "    2\n",
    "    ```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from __future__ import absolute_import\n",
    "from __future__ import division\n",
    "from __future__ import print_function\n",
    "\n",
    "import numpy as np\n",
    "import ray\n",
    "import time"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ray.init(num_cpus=4, include_webui=False, ignore_reinit_error=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Change the `Foo` class to be an actor class by using the `@ray.remote` decorator."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class Foo(object):\n",
    "    def __init__(self):\n",
    "        self.counter = 0\n",
    "\n",
    "    def reset(self):\n",
    "        self.counter = 0\n",
    "\n",
    "    def increment(self):\n",
    "        time.sleep(0.5)\n",
    "        self.counter += 1\n",
    "        return self.counter\n",
    "\n",
    "assert hasattr(Foo, 'remote'), 'You need to turn \"Foo\" into an actor with @ray.remote.'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Change the intantiations below to create two actors by calling `Foo.remote()`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create two Foo objects.\n",
    "f1 = Foo()\n",
    "f2 = Foo()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**EXERCISE:** Parallelize the code below. The two actors can execute methods in parallel (though each actor can only execute one method at a time)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Sleep a little to improve the accuracy of the timing measurements below.\n",
    "time.sleep(2.0)\n",
    "start_time = time.time()\n",
    "\n",
    "# Reset the actor state so that we can run this cell multiple times without\n",
    "# changing the results.\n",
    "f1.reset()\n",
    "f2.reset()\n",
    "\n",
    "# We want to parallelize this code. However, it is not straightforward to\n",
    "# make \"increment\" a remote function, because state is shared (the value of\n",
    "# \"self.counter\") between subsequent calls to \"increment\". In this case, it\n",
    "# makes sense to use actors.\n",
    "results = []\n",
    "for _ in range(5):\n",
    "    results.append(f1.increment())\n",
    "    results.append(f2.increment())\n",
    "\n",
    "end_time = time.time()\n",
    "duration = end_time - start_time\n",
    "\n",
    "assert not any([isinstance(result, ray.ObjectID) for result in results]), 'Looks like \"results\" is {}. You may have forgotten to call ray.get.'.format(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**VERIFY:** Run some checks to verify that the changes you made to the code were correct. Some of the checks should fail when you initially run the cells. After completing the exercises, the checks should pass."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "assert results == [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]\n",
    "\n",
    "assert duration < 3, ('The experiments ran in {} seconds. This is too '\n",
    "                      'slow.'.format(duration))\n",
    "assert duration > 2.5, ('The experiments ran in {} seconds. This is too '\n",
    "                        'fast.'.format(duration))\n",
    "\n",
    "print('Success! The example took {} seconds.'.format(duration))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.1"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": false,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {
    "height": "calc(100% - 180px)",
    "left": "10px",
    "top": "150px",
    "width": "382.391px"
   },
   "toc_section_display": true,
   "toc_window_display": true
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
